package com.fintech.pangu.rocketmq.core.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.fintech.pangu.rocketmq.enums.ConsumeMode;
import com.fintech.pangu.rocketmq.enums.SelectorType;
import com.fintech.pangu.rocketmq.metrics.RocketMQConsumeMessageContext;
import com.fintech.pangu.rocketmq.metrics.RocketMQMetricConstants;
import com.fintech.pangu.rocketmq.metrics.RocketMQMetricsCollector;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

import java.util.List;

/**
 * 整合rocketmq consumer监听消息 和 调用对应的RocketMQListener的onMessage()方法处理消费消息的逻辑
 *
 * @author dell
 * @since 1.0.0
 */
@Getter
@Setter
public class DefaultRocketMQListenerContainer implements InitializingBean, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);

    /**
     * 消费者
     */
    private DefaultMQPushConsumer consumer;

    /**
     * 消费监听接口实现
     */
    private RocketMQListener rocketMQListener;

    /**
     * consumer是否启动标示
     */
    private volatile boolean started;

    /**
     * 消费端实例名称
     */
    private String instanceName;

    /**
     * 消费组
     */
    private String consumerGroup;

    /**
     * name server
     */
    private String nameServer;

    /**
     * 主题
     */
    private String topic;

    /**
     * 从哪儿开始消费
     */
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

    /**
     * 消费模式，默认是并发消费
     */
    private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;

    /**
     * 消息模式，默认集群模式
     */
    private MessageModel messageModel = MessageModel.CLUSTERING;

    /**
     * 过滤类型，使用TAG过滤，4.1.0-incubating开始支持SQL92
     */
    private SelectorType selectorType = SelectorType.TAG;

    /**
     * 过滤表达式
     */
    private String selectorExpress = "*";

    /**
     * 最大消费线程数，默认20
     */
    private int consumeThreadMin = 20;

    /**
     * 最大消费线程数，默认64
     */
    private int consumeThreadMax = 64;

    /**
     * 最大批量消费消息数量，默认值1
     */
    private int consumeMessageBatchMaxSize = 1;

    /**
     * 最大重复消费次数，默认值1
     */
    private int maxReconsumeTime = 3;

    /**
     * Message consume retry strategy
     * -1，no retry,put into DLQ directly
     * 0，broker control retry frequency
     * >0，client control retry frequency
     */
    private int delayLevelWhenNextConsume = 0;

    /**
     * 顺序消费后，返回SUSPEND_CURRENT_QUEUE_A_MOMENT状态，
     * 暂停当前队列一段时间再重试，最多重试16次
     * 暂停的时间默认1000ms
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    /** Metric指标收集器 */
    private RocketMQMetricsCollector rocketMQMetricsCollector;


    public void setRocketMQListener(RocketMQListener rocketMQListener) {
        this.rocketMQListener = rocketMQListener;
    }

    public boolean isStarted() {
        return started;
    }

    public void setStarted(boolean started) {
        this.started = started;
    }

    public String getInstanceName() {
        return instanceName;
    }

    public DefaultRocketMQListenerContainer setInstanceName(String instanceName) {
        this.instanceName = instanceName;
        return this;
    }



    /**
     * 实现InitializingBean接口的方法
     * 在所有属性设置完成后，由BeanFactory调用此方法
     *
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        start();  //调用启动方法，涉及
    }

    /**
     * 实现DisposableBean接口的方法
     * 在BeanFactory销毁Bean之前释放资源
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        consumer.shutdown();
    }


    /**
     * 启动
     *
     * @throws MQClientException
     */
    public synchronized void start() throws MQClientException {
        //是否已经启动
        if (this.isStarted()) {
            throw new IllegalStateException("rocketmq消费端容器实例已经启动. " + this.toString());
        }

        //初始化consumer
        initRocketMQPushConsumer();

        //启动rocketmq consumer
        consumer.start();

        //设置启动标示
        this.setStarted(true);

        logger.info("rocketmq消费端容器实例启动成功: {}", this.toString());
    }

    /**
     * 初始化consumer
     */
    private void initRocketMQPushConsumer() throws MQClientException {
        //校验
        Assert.notNull(rocketMQListener, "属性 'rocketMQListener' 是必需的");
        Assert.notNull(consumerGroup, "属性 'consumerGroup' 是必需的");
        Assert.notNull(nameServer, "属性 'nameServer' 是必需的");
        Assert.notNull(topic, "属性 'topic' 是必需的");


        consumer = new DefaultMQPushConsumer(consumerGroup);
        if (instanceName != null) {
            consumer.setInstanceName(instanceName);
        }
        consumer.setNamesrvAddr(nameServer);
        //从哪儿开始消费，默认CONSUME_FROM_LAST_OFFSET
        consumer.setConsumeFromWhere(consumeFromWhere);

        //最大消费线程数
        consumer.setConsumeThreadMax(consumeThreadMax);
        //如果最小消费线程 大于 最大消费线程，设置为最大消费线程
        if (consumeThreadMin > consumeThreadMax) {
            consumer.setConsumeThreadMin(consumeThreadMax);
        } else {
            consumer.setConsumeThreadMin(consumeThreadMin);
        }

        //最大批量消息大小，默认值1
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

        //默认集群模式
        consumer.setMessageModel(messageModel);


        //设置过滤方式
        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpress);
                break;
            default:
                throw new IllegalArgumentException("属性[selectorType=" + selectorType + "]非法.");
        }

        //根据消费模式设置MessageListener实现
        switch (consumeMode) {
            //并发消费
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            //顺序消费
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            default:
                throw new IllegalArgumentException("属性[consumeMode=" + consumeMode + "]非法.");
        }

        /**
         * provide an entryway to custom setting RocketMQ consumer
         * 如果rocketMQListener实现了RocketMQPushConsumerLifecycleListener接口，就将consumer作为参数传入，可在consumer启动前进行配置
         */
        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        }
    }


    /**
     * 根据消息获取topic信息
     * @param msgs
     * @return
     */
    private String getRealTopic(List<MessageExt> msgs){
        String topic = "NaN";
        if(msgs!=null && msgs.size()>0){
            topic = ((MessageExt)msgs.get(0)).getTopic();
        }
        return topic;
    }

    /**
     * 是否为重试消息
     * @param msgs
     * @return
     */
    private boolean getRetryFlag(List<MessageExt> msgs){
        boolean retry = false;
        if(msgs!=null && msgs.size()>0){
            int reconsumeTimes = ((MessageExt)msgs.get(0)).getReconsumeTimes();
            if(reconsumeTimes > 0){
                retry = true;
            }
        }
        return retry;
    }

    /**
     * 并发消费MessageListenerConcurrently的默认实现类
     */
    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            logger.info(Thread.currentThread().getName() + " 消费端接收到消息记录数: {}", msgs.size());

            long costTimeTotal = 0L;  //消费多条总耗时
            // 循环消费
            for (MessageExt messageExt : msgs) {
                try {
                    logger.debug("消费端接收到消息: {}", messageExt);

                    long now = System.currentTimeMillis();
                    rocketMQListener.onMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    costTimeTotal = costTimeTotal + costTime;
                    logger.info("消费端处理消息成功 : {} 耗时: {} 毫秒", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    //默认值0，broker control retry frequency
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);

                    /**
                     * 失败指标收集
                     */
                    RocketMQConsumeMessageContext rocketMQConsumeMessageContext = RocketMQConsumeMessageContext.builder()
                            .success(false)
                            .retry(DefaultRocketMQListenerContainer.this.getRetryFlag(msgs))
                            .topic(DefaultRocketMQListenerContainer.this.getRealTopic(msgs))
                            .consumerGroup(DefaultRocketMQListenerContainer.this.consumerGroup)
                            .messageModel(DefaultRocketMQListenerContainer.this.messageModel.name())
                            .consumeMode(DefaultRocketMQListenerContainer.this.consumeMode.name())
                            .consumeCount((msgs!=null&&msgs.size()>0) ? msgs.size() : 0)
                            .build();

                    //达到最大重试次数，返回消费成功
                    if (messageExt.getReconsumeTimes() < maxReconsumeTime) {
                        logger.error("消费端消费失败, 重新消费次数[" + messageExt.getReconsumeTimes() + "], 消息体:" + messageExt, e);

                        // 记录内部失败
                        rocketMQConsumeMessageContext.setConsumeStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
                        rocketMQMetricsCollector.consumeKernelFailure(rocketMQConsumeMessageContext);

                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    } else {
                        logger.error("消费端消费失败, 达到最大重新消费次数[" + maxReconsumeTime + "]. 消息体:" + messageExt, e);
                        try {
                            rocketMQListener.onFinalError(messageExt);
                        } catch (Exception ex) {
                            logger.error("执行onFinalError异常：", ex);
                        }

                        // 记录内部失败
                        rocketMQConsumeMessageContext.setConsumeStatus(RocketMQMetricConstants.ConsumeStatus.CONSUME_FAILED);  // 内部最终失败
                        rocketMQMetricsCollector.consumeKernelFailure(rocketMQConsumeMessageContext);

                        // 记录消费端失败
                        rocketMQConsumeMessageContext.setConsumeStatus(RocketMQMetricConstants.ConsumeStatus.CONSUME_FAILED); // 消费端最终失败
                        rocketMQMetricsCollector.consumeFailure(rocketMQConsumeMessageContext);

                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  // 返回CONSUME_SUCCESS，让broker不再重试
                    }
                }
            }

            /**
             * 成功指标收集
             * 只有多条消息都消费成功才是成功，同时记录内部和消费端消费成功
             */
            RocketMQConsumeMessageContext rocketMQConsumeMessageContext = RocketMQConsumeMessageContext.builder()
                    .success(true)
                    .retry(DefaultRocketMQListenerContainer.this.getRetryFlag(msgs))
                    .topic(DefaultRocketMQListenerContainer.this.getRealTopic(msgs))
                    .consumerGroup(DefaultRocketMQListenerContainer.this.consumerGroup)
                    .messageModel(DefaultRocketMQListenerContainer.this.messageModel.name())
                    .consumeMode(DefaultRocketMQListenerContainer.this.consumeMode.name())
                    .consumeStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name())
                    .consumeCount((msgs!=null&&msgs.size()>0) ? msgs.size() : 0)
                    .duration(costTimeTotal / msgs.size())  // 平均耗时
                    .build();

            // 内部消费成功
            rocketMQMetricsCollector.consumeKernelSuccess(rocketMQConsumeMessageContext);
            // 消费端消费成功
            rocketMQMetricsCollector.consumeSuccess(rocketMQConsumeMessageContext);

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    /**
     * 顺序消费MessageListenerOrderly的默认实现类
     */
    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            logger.info("消费端接收到消息记录数: {}", msgs.size());
            for (MessageExt messageExt : msgs) {
                logger.debug("消费端接收到消息: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    rocketMQListener.onMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    logger.debug("消费端处理消息成功 {} 耗时: {} 毫秒", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    //暂停默认1000ms
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);

                    //达到最大重试次数，返回消费成功
                    if (messageExt.getReconsumeTimes() <= maxReconsumeTime) {
                        logger.error("消费端消费失败, 重新消费次数[" + messageExt.getReconsumeTimes() + "], 消息体:" + messageExt, e);
                        //暂停当前队列一段时间再重试，最多重试16次
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    } else {
                        logger.error("消费端消费失败, 达到最大重新消费次数[" + maxReconsumeTime + "]. 消息体:" + messageExt, e);
                        try {
                            rocketMQListener.onFinalError(messageExt);
                        } catch (Exception ex) {
                            logger.error("执行onFinalError异常：", ex);
                        }
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

}
